% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_replicator).

-export([
    replicate/2,
    replication_states/0,
    job/1,
    doc/3,
    active_doc/2,
    info_from_doc/2,
    restart_job/1
]).

-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include_lib("mem3/include/mem3.hrl").

-define(REPLICATION_STATES, [
    % Just added to scheduler
    initializing,
    % Could not be turned into a replication job
    error,
    % Scheduled and running
    running,
    % Scheduled and waiting to run
    pending,
    % Scheduled but crashing, backed off by the scheduler
    crashing,
    % Non-continuous (normal) completed replication
    completed,
    % Terminal failure, will not be retried anymore
    failed
]).

-import(couch_util, [
    get_value/2,
    get_value/3
]).

-spec replicate({[_]}, any()) ->
    {ok, {continuous, binary()}}
    | {ok, {[_]}}
    | {ok, {cancelled, binary()}}
    | {error, any()}
    | no_return().
replicate(PostBody, Ctx) ->
    {ok, Rep0} = couch_replicator_parse:parse_rep_doc(PostBody, Ctx),
    Rep = Rep0#rep{start_time = os:timestamp()},
    ok = couch_replicator_utils:valid_endpoint_protocols_log(Rep),
    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
    case get_value(cancel, Options, false) of
        true ->
            CancelRepId =
                case get_value(id, Options, nil) of
                    nil ->
                        RepId;
                    RepId2 ->
                        RepId2
                end,
            case check_authorization(CancelRepId, UserCtx) of
                ok ->
                    cancel_replication(CancelRepId);
                not_found ->
                    {error, not_found}
            end;
        false ->
            check_authorization(RepId, UserCtx),
            {ok, Listener} = rep_result_listener(RepId),
            Result = do_replication_loop(Rep),
            couch_replicator_notifier:stop(Listener),
            Result
    end.

-spec do_replication_loop(#rep{}) ->
    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
    ok = couch_replicator_scheduler:add_job(Rep),
    case get_value(continuous, Options, false) of
        true ->
            {ok, {continuous, ?l2b(BaseId ++ Ext)}};
        false ->
            wait_for_result(Id)
    end.

-spec rep_result_listener(rep_id()) -> {ok, pid()}.
rep_result_listener(RepId) ->
    ReplyTo = self(),
    {ok, _Listener} = couch_replicator_notifier:start_link(
        fun
            ({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
                ReplyTo ! Ev;
            (_) ->
                ok
        end
    ).

-spec wait_for_result(rep_id()) ->
    {ok, {[_]}} | {error, any()}.
wait_for_result(RepId) ->
    receive
        {finished, RepId, RepResult} ->
            {ok, RepResult};
        {error, RepId, Reason} ->
            {error, Reason}
    end.

-spec cancel_replication(rep_id()) ->
    {ok, {cancelled, binary()}} | {error, not_found}.
cancel_replication({BasedId, Extension} = RepId) ->
    FullRepId = BasedId ++ Extension,
    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
    case couch_replicator_scheduler:rep_state(RepId) of
        #rep{} ->
            ok = couch_replicator_scheduler:remove_job(RepId),
            couch_log:notice("Replication '~s' cancelled", [FullRepId]),
            {ok, {cancelled, ?l2b(FullRepId)}};
        nil ->
            couch_log:notice("Replication '~s' not found", [FullRepId]),
            {error, not_found}
    end.

-spec replication_states() -> [atom()].
replication_states() ->
    ?REPLICATION_STATES.

-spec strip_url_creds(binary() | {[_]}) -> binary().
strip_url_creds(Endpoint) ->
    try couch_replicator_parse:parse_rep_db(Endpoint, [], []) of
        #httpdb{url = Url} ->
            iolist_to_binary(couch_util:url_strip_password(Url))
    catch
        throw:{error, local_endpoints_not_supported} ->
            Endpoint;
        throw:{error, _} ->
            % Avoid exposing any part of the URL in case there is a password in
            % the malformed endpoint URL
            null;
        error:_ ->
            % Avoid exposing any part of the URL in case there is a password in
            % the malformed endpoint URL
            null
    end.

-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
job(JobId0) when is_binary(JobId0) ->
    JobId = couch_replicator_ids:convert(JobId0),
    {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
    case [JobInfo || {ok, JobInfo} <- Res] of
        [JobInfo | _] ->
            {ok, JobInfo};
        [] ->
            {error, not_found}
    end.

-spec restart_job(binary() | list() | rep_id()) ->
    {ok, {[_]}} | {error, not_found}.
restart_job(JobId0) ->
    JobId = couch_replicator_ids:convert(JobId0),
    {Res, _} = rpc:multicall(couch_replicator_scheduler, restart_job, [JobId]),
    case [JobInfo || {ok, JobInfo} <- Res] of
        [JobInfo | _] ->
            {ok, JobInfo};
        [] ->
            {error, not_found}
    end.

-spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
active_doc(DbName, DocId) ->
    try
        Owner = couch_replicator_utils:owner(DbName, DocId),
        case active_doc_rpc(DbName, DocId, [Owner]) of
            {ok, DocInfo} ->
                {ok, DocInfo};
            {error, not_found} ->
                Shards = mem3:shards(DbName, DocId),
                Live = [node() | nodes()] -- [Owner],
                Nodes = [N || #shard{node = N} <- Shards, lists:member(N, Live)],
                active_doc_rpc(DbName, DocId, Nodes)
        end
    catch
        % Might be a local database
        error:database_does_not_exist ->
            active_doc_rpc(DbName, DocId, [node()])
    end.

-spec active_doc_rpc(binary(), binary(), [node()]) ->
    {ok, {[_]}} | {error, not_found}.
active_doc_rpc(_DbName, _DocId, []) ->
    {error, not_found};
active_doc_rpc(DbName, DocId, [Node]) when Node =:= node() ->
    couch_replicator_doc_processor:doc(DbName, DocId);
active_doc_rpc(DbName, DocId, Nodes) ->
    {Res, _Bad} = rpc:multicall(
        Nodes,
        couch_replicator_doc_processor,
        doc,
        [DbName, DocId]
    ),
    case [DocInfo || {ok, DocInfo} <- Res] of
        [DocInfo | _] ->
            {ok, DocInfo};
        [] ->
            {error, not_found}
    end.

-spec doc(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
doc(RepDb, DocId, UserCtx) ->
    case active_doc(RepDb, DocId) of
        {ok, DocInfo} ->
            {ok, DocInfo};
        {error, not_found} ->
            doc_from_db(RepDb, DocId, UserCtx)
    end.

-spec doc_from_db(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
doc_from_db(RepDb, DocId, UserCtx) ->
    case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
        {ok, Doc} ->
            {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))};
        {not_found, _Reason} ->
            {error, not_found}
    end.

-spec info_from_doc(binary(), {[_]}) -> {[_]}.
info_from_doc(RepDb, {Props}) ->
    DocId = get_value(<<"_id">>, Props),
    Source = get_value(<<"source">>, Props),
    Target = get_value(<<"target">>, Props),
    State0 = state_atom(get_value(<<"_replication_state">>, Props, null)),
    StateTime = get_value(<<"_replication_state_time">>, Props, null),
    {State1, StateInfo, ErrorCount, StartTime} =
        case State0 of
            completed ->
                {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
                case lists:keytake(<<"start_time">>, 1, InfoP) of
                    {value, {_, Time}, InfoP1} ->
                        {State0, {InfoP1}, 0, Time};
                    false ->
                        case lists:keytake(start_time, 1, InfoP) of
                            {value, {_, Time}, InfoP1} ->
                                {State0, {InfoP1}, 0, Time};
                            false ->
                                {State0, {InfoP}, 0, null}
                        end
                end;
            failed ->
                Info = get_value(<<"_replication_state_reason">>, Props, nil),
                EJsonInfo = couch_replicator_utils:ejson_state_info(Info),
                {State0, EJsonInfo, 1, StateTime};
            _OtherState ->
                {null, null, 0, null}
        end,
    {[
        {doc_id, DocId},
        {database, RepDb},
        {id, null},
        {source, strip_url_creds(Source)},
        {target, strip_url_creds(Target)},
        {state, State1},
        {error_count, ErrorCount},
        {info, StateInfo},
        {start_time, StartTime},
        {last_updated, StateTime}
    ]}.

state_atom(<<"triggered">>) ->
    % This handles a legacy case were document wasn't converted yet
    triggered;
state_atom(State) when is_binary(State) ->
    erlang:binary_to_existing_atom(State, utf8);
state_atom(State) when is_atom(State) ->
    State.

-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
    case couch_replicator_scheduler:rep_state(RepId) of
        #rep{user_ctx = #user_ctx{name = Name}} ->
            ok;
        #rep{} ->
            couch_httpd:verify_is_server_admin(Ctx);
        nil ->
            not_found
    end.

-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").

authorization_test_() ->
    {
        foreach,
        fun() -> ok end,
        fun(_) -> meck:unload() end,
        [
            t_admin_is_always_authorized(),
            t_username_must_match(),
            t_replication_not_found()
        ]
    }.

t_admin_is_always_authorized() ->
    ?_test(begin
        expect_rep_user_ctx(<<"someuser">>, <<"_admin">>),
        UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx))
    end).

t_username_must_match() ->
    ?_test(begin
        expect_rep_user_ctx(<<"user">>, <<"somerole">>),
        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)),
        UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]},
        ?assertThrow(
            {unauthorized, _},
            check_authorization(
                <<"RepId">>,
                UserCtx2
            )
        )
    end).

t_replication_not_found() ->
    ?_test(begin
        meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)),
        UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2))
    end).

expect_rep_user_ctx(Name, Role) ->
    meck:expect(
        couch_replicator_scheduler,
        rep_state,
        fun(_Id) ->
            UserCtx = #user_ctx{name = Name, roles = [Role]},
            #rep{user_ctx = UserCtx}
        end
    ).

strip_url_creds_test_() ->
    {
        setup,
        fun() ->
            meck:expect(config, get, fun(_, _, Default) -> Default end)
        end,
        fun(_) ->
            meck:unload()
        end,
        [
            t_strip_http_basic_creds(),
            t_strip_http_props_creds(),
            t_strip_local_db_creds(),
            t_strip_url_creds_errors()
        ]
    }.

t_strip_local_db_creds() ->
    ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))).

t_strip_http_basic_creds() ->
    ?_test(begin
        Url1 = <<"http://adm:pass@host/db">>,
        ?assertEqual(<<"http://host/db/">>, strip_url_creds(Url1)),
        Url2 = <<"https://adm:pass@host/db">>,
        ?assertEqual(<<"https://host/db/">>, strip_url_creds(Url2)),
        Url3 = <<"http://adm:pass@host:80/db">>,
        ?assertEqual(<<"http://host:80/db/">>, strip_url_creds(Url3)),
        Url4 = <<"http://adm:pass@host/db?a=b&c=d">>,
        ?assertEqual(
            <<"http://host/db?a=b&c=d">>,
            strip_url_creds(Url4)
        )
    end).

t_strip_http_props_creds() ->
    ?_test(begin
        Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]},
        ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props1)),
        Props2 =
            {[
                {<<"url">>, <<"http://host/db">>},
                {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}}
            ]},
        ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2))
    end).

t_strip_url_creds_errors() ->
    ?_test(begin
        Bad1 = {[{<<"url">>, <<"http://adm:pass/bad">>}]},
        ?assertEqual(null, strip_url_creds(Bad1)),
        Bad2 = {[{<<"garbage">>, <<"more garbage">>}]},
        ?assertEqual(null, strip_url_creds(Bad2)),
        Bad3 = <<"http://a:b:c">>,
        ?assertEqual(null, strip_url_creds(Bad3)),
        Bad4 = <<"http://adm:pass:pass/bad">>,
        ?assertEqual(null, strip_url_creds(Bad4)),
        ?assertEqual(null, strip_url_creds(null)),
        ?assertEqual(null, strip_url_creds(42)),
        ?assertEqual(null, strip_url_creds([<<"a">>, <<"b">>])),
        Bad5 = {[{<<"source_proxy">>, <<"http://adm:pass/bad">>}]},
        ?assertEqual(null, strip_url_creds(Bad5))
    end).

-endif.
